package com.youhome.analysis.config;

import com.youhome.analysis.listener.CustomerJobListener;
import com.youhome.analysis.model.CustomerType;
import com.youhome.analysis.processor.CustomerItemProcessor;
import com.youhome.analysis.validator.CustomerBeanValidator;
import com.youhome.analysis.writer.CustomerItemWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.validator.Validator;
import org.springframework.batch.item.xml.StaxEventItemReader;
import org.springframework.batch.item.xml.builder.StaxEventItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;


/**
 * create by jack 2017/10/28
 * 配置类
 */
//@Configuration
//@EnableBatchProcessing
public class CustomerBatchConfig {

    @Autowired
    private CustomerItemWriter customerItemWriter;

    /**
     * ItemReader定义,用来读取数据
     * 1，使用FlatFileItemReader读取文件
     * 2，使用FlatFileItemReader的setResource方法设置csv文件的路径
     * 3，对此对cvs文件的数据和领域模型类做对应映射
     *
     * @return
     * @throws Exception
     */
    @Bean
    public ItemReader<CustomerType> reader() throws Exception {
        XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
        xStreamMarshaller.setAnnotatedClasses(CustomerType.class);
        StaxEventItemReader<CustomerType> xmlItemReader = new StaxEventItemReaderBuilder<CustomerType>()
                .name("xmlItemReader")
                .resource(new ClassPathResource("data/customers.xml"))
                .addFragmentRootElements("customer")
                .unmarshaller(xStreamMarshaller)
                .build();
        return xmlItemReader;
    }


    /**
     * ItemProcessor定义，用来处理数据
     *
     * @return
     */
    @Bean
    public ItemProcessor<CustomerType, CustomerType> processor() {
        //使用我们自定义的ItemProcessor的实现CsvItemProcessor
        CustomerItemProcessor processor = new CustomerItemProcessor();
        //为processor指定校验器为CsvBeanValidator()
        processor.setValidator(customerBeanValidator());
        return processor;
    }

    /**
     * JobRepository，用来注册Job的容器
     * jobRepositor的定义需要dataSource和transactionManager，Spring Boot已为我们自动配置了
     * 这两个类，Spring可通过方法注入已有的Bean
     *
     * @param dataSource
     * @param transactionManager
     * @return
     * @throws Exception
     */
    @Bean
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws
            Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDatabaseType("mysql");
        return jobRepositoryFactoryBean.getObject();
    }

    /**
     * JobLauncher定义，用来启动Job的接口
     *
     * @param dataSource
     * @param transactionManager
     * @return
     * @throws Exception
     */
    @Bean
    public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws
            Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
        return jobLauncher;
    }

    /**
     * Job定义，我们要实际执行的任务，包含一个或多个Step
     *
     * @param jobBuilderFactory
     * @param s1
     * @return
     */
    @Bean
    public Job importJob(JobBuilderFactory jobBuilderFactory, Step s1) {
        return jobBuilderFactory.get("importJob")
                .incrementer(new RunIdIncrementer())
                .flow(s1)//为Job指定Step
                .end()
                .listener(customerJobListener())//绑定监听器csvJobListener
                .build();
    }

    /**
     * step步骤，包含ItemReader，ItemProcessor和ItemWriter
     *
     * @param stepBuilderFactory
     * @param reader
     * @param writer
     * @param processor
     * @return
     */
    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<CustomerType> reader,
                      ItemWriter<CustomerType> writer, ItemProcessor<CustomerType, CustomerType> processor) {
        return stepBuilderFactory
                .get("step1")
                .<CustomerType, CustomerType>chunk(65000)//批处理每次提交65000条数据
                .reader(reader)//给step绑定reader
                .processor(processor)//给step绑定processor
                .writer(customerItemWriter)//给step绑定writer
                .build();
    }

    @Bean
    public CustomerJobListener customerJobListener() {
        return new CustomerJobListener();
    }

    @Bean
    public Validator<CustomerType> customerBeanValidator() {
        return new CustomerBeanValidator<>();
    }


}
